package cKafka

import (
	"context"
	"errors"
	"fmt"
	"log"
	"strings"
	"sync"
	"time"

	"github.com/IBM/sarama"
	"github.com/IBM/sarama/tools/tls"
	"github.com/gin-gonic/gin"
	"github.com/google/uuid"

	"gitee.com/csingo/cContext"
	"gitee.com/csingo/cHelper"
	"gitee.com/csingo/cLog"
)

type ConsumerType string

const (
	CONSUMER_TYPE_PARTITION ConsumerType = "partition"
	CONSUMER_TYPE_GROUP     ConsumerType = "group"
)

type ConsumerOption struct {
	ID         string
	Type       ConsumerType
	Config     *sarama.Config
	Brokers    []string
	Topic      string
	Group      string
	Partitions []int32
	Offset     int64
	Size       int64
	Wait       int64
	Multi      bool
}

func (i *ConsumerOption) GetID() (clientId, consumerId string) {
	identify := cHelper.Md5([]byte(i.ID))
	clientId = fmt.Sprintf("%s:%s", identify, strings.Join(i.Brokers, ","))
	consumerId = fmt.Sprintf("%s:%s:%s:%s:%v:%d:%d:%d", identify, strings.Join(i.Brokers, ","), i.Type, i.Topic, i.Partitions, i.Offset, i.Size, i.Wait)
	if i.Multi {
		uid := uuid.New().String()
		clientId = fmt.Sprintf("%s:%s", clientId, uid)
		consumerId = fmt.Sprintf("%s:%s", consumerId, uid)
	}

	return
}

type consumer struct {
	id         string
	typ        ConsumerType
	borkers    []string
	topic      string
	group      string
	partitions []int32
	offset     int64
	size       int64
	wait       int64
	multi      bool

	Handler      func(ctx *gin.Context, msgs []*sarama.ConsumerMessage)
	GroupHandler sarama.ConsumerGroupHandler

	config   *sarama.Config
	client   sarama.Client
	clientId string

	groupConsumer     sarama.ConsumerGroup
	partitionConsumer sarama.Consumer

	partitionWait    *sync.WaitGroup
	partitionProcess []sarama.PartitionConsumer
	message          chan *sarama.ConsumerMessage
	messages         []*sarama.ConsumerMessage
	send             bool
	ticker           *time.Ticker
	deleted          bool
	running          bool
	multiLock        *sync.Mutex

	tickerProcessClose  bool
	handleProcessClose  bool
	forwardProcessClose bool
}

func (i *consumer) ID() string {
	return i.id
}

func (i *consumer) Close(ctx *gin.Context) {
	cLog.WithContext(ctx, map[string]interface{}{
		"source": "cKafka.consumer.Close",
		"topic":  i.topic,
	}).Trace("cKafka 捕获 consumer 结束信号")

	i.running = false
	i.forwardProcessClose = true
	i.tickerProcessClose = true
	i.handleProcessClose = true

	switch i.typ {
	case CONSUMER_TYPE_PARTITION:
		// 关闭各个分区的消费者
		for _, pc := range i.partitionProcess {
			pc.AsyncClose()
		}
		// 关闭分区消费者连接
		if err := i.partitionConsumer.Close(); err != nil {
			cLog.WithContext(ctx, map[string]interface{}{
				"source": "cKafka.consumer.Close",
				"topic":  i.topic,
				"err":    err.Error(),
			}).Error("Kafka partition consumer 停止失败")
		}
	case CONSUMER_TYPE_GROUP:
		// 关闭分组消费者连接
		if err := i.groupConsumer.Close(); err != nil {
			cLog.WithContext(ctx, map[string]interface{}{
				"source": "cKafka.consumer.Close",
				"topic":  i.topic,
				"err":    err.Error(),
			}).Error("Kafka group consumer 停止失败")
		}
	}

	if !i.deleted {
		container.deleteOperator(i.clientId, i.id)
	}
}

func (i *consumer) Delete() {
	i.deleted = true
}

func (i *consumer) Consume(ctx *gin.Context) error {
	defer func() {
		if !i.running {
			i.forwardProcessClose = true
			i.tickerProcessClose = true
			i.handleProcessClose = true

			cLog.WithContext(ctx, map[string]interface{}{
				"source": "cKafka.consumer.Consume",
				"topic":  i.topic,
			}).Trace("cKafka consumer 已结束")
		}
	}()

	var err error

	i.multiLock.Lock()
	if i.running {
		return errors.New("cKafka 已存在相同的消费者")
	}

	i.running = true
	i.multiLock.Unlock()

	switch i.typ {
	case CONSUMER_TYPE_PARTITION:
		err = i.consumePartition(ctx)
	case CONSUMER_TYPE_GROUP:
		err = i.consumeGroup(ctx)
	}

	i.running = false

	return err
}

func (i *consumer) consumePartition(ctx *gin.Context) error {
	// 启动定时器协程，跟随客户端连接关闭
	// 当等待时长大于0时有效
	// 若超出等待时长，则更新消息处理状态
	go func(c *consumer) {
		if c.wait <= 0 {
			return
		}

		c.ticker = time.NewTicker(time.Duration(c.wait) * time.Second)

		for {
			select {
			case <-c.ticker.C:
				c.send = true
			default:
				if c.tickerProcessClose {
					return
				}
			}
		}
	}(i)

	// 启动消息聚合处理协程，跟随客户端关闭
	// 当缓存区设置 > 0 时，清空缓存区消息并进行处理
	// 当接收到等待超时信号时，清空缓存区消息并进行处理
	go func(c *consumer) {
		for {
			select {
			case message := <-c.message:
				c.messages = append(c.messages, message)
				// 判断缓存区大小，未设置或超出限额则进行消息处理，否则聚合消息
				if c.size <= 0 || (c.size > 0 && int64(len(c.messages)) >= c.size) {
					handlerCtx := cContext.New()
					c.Handler(handlerCtx, c.messages)
					c.messages = []*sarama.ConsumerMessage{}
					c.send = false
				}
			default:
				if c.handleProcessClose {
					return
				}
				// 判断等待超时信号
				if c.send && len(c.messages) > 0 {
					handlerCtx := cContext.New()
					c.Handler(handlerCtx, c.messages)
					c.messages = []*sarama.ConsumerMessage{}
					c.send = false
				}
			}
		}
	}(i)

	// 消费分区消息
	for _, partition := range i.partitions {
		// 读取分区消息
		i.partitionWait.Add(1)
		go func(c *consumer, partition int32) {
			defer func() {
				c.partitionWait.Done()
				cLog.WithContext(ctx, map[string]interface{}{
					"source":    "cKafka.consumer.consumePartition",
					"topic":     c.topic,
					"partition": partition,
				}).Trace("cKafka partition consumer 消息提取协程已结束")
			}()

			// 创建分区消费者
			pc, e := c.partitionConsumer.ConsumePartition(c.topic, partition, c.offset)
			if e != nil {
				cLog.WithContext(ctx, map[string]interface{}{
					"source":    "cKafka.consumer.consumePartition",
					"topic":     c.topic,
					"partition": partition,
					"err":       e.Error(),
				}).Error("cKafka 初始化 partition consumer 失败")
			}

			c.partitionProcess = append(c.partitionProcess, pc)

			// 消费消息
			for message := range pc.Messages() {
				c.message <- message
			}
		}(i, partition)
	}
	i.partitionWait.Wait()

	return nil
}

func (i *consumer) consumeGroup(ctx *gin.Context) error {
	defer cLog.WithContext(ctx, map[string]interface{}{
		"source": "cKafka.consumer.consumeGroup",
		"topic":  i.topic,
	}).Trace("cKafka group consumer 已结束")

	cLog.WithContext(ctx, map[string]interface{}{
		"source":   "cKafka.consumer.consumeGroup",
		"topic":    i.topic,
		"client":   i.clientId,
		"consumer": i.id,
	}).Trace("cKafka 启动 group consumer")

	var err error
	groupConsumerCtx := context.Background()

	switch i.GroupHandler.(type) {
	case *DefaultGroupHandler:
		// 启动定时器协程，跟随客户端连接关闭
		// 当等待时长大于0时有效
		// 若超出等待时长，则更新消息处理状态
		go func(c *consumer) {
			if c.wait <= 0 {
				return
			}

			cLog.WithContext(ctx, map[string]interface{}{
				"source": "cKafka.consumer.consumeGroup",
				"topic":  c.topic,
			}).Trace("cKafka group consumer 启动计时协程")

			defer cLog.WithContext(ctx, map[string]interface{}{
				"source": "cKafka.consumer.consumeGroup",
				"topic":  c.topic,
			}).Trace("cKafka group consumer 计时协程已结束")

			c.ticker = time.NewTicker(time.Duration(c.wait) * time.Second)

			for {
				select {
				case <-c.ticker.C:
					c.send = true
				default:
					if c.tickerProcessClose {
						return
					}
				}
			}
		}(i)

		// 启动消息聚合处理协程，跟随客户端关闭
		// 当缓存区设置 > 0 时，清空缓存区消息并进行处理
		// 当接收到等待超时信号时，清空缓存区消息并进行处理
		go func(c *consumer) {
			cLog.WithContext(ctx, map[string]interface{}{
				"source": "cKafka.consumer.consumeGroup",
				"topic":  c.topic,
			}).Trace("cKafka group consumer 启动聚合消息处理协程")

			defer cLog.WithContext(ctx, map[string]interface{}{
				"source": "cKafka.consumer.consumeGroup",
				"topic":  c.topic,
			}).Trace("cKafka group consumer 聚合消息处理协程已结束")

			for {
				select {
				case message := <-c.message:
					c.messages = append(c.messages, message)
					if c.size <= 0 || (c.size > 0 && int64(len(c.messages)) >= c.size) {
						handlerCtx := cContext.New()
						c.Handler(handlerCtx, c.messages)
						c.messages = []*sarama.ConsumerMessage{}
						c.send = false
					}
				default:
					if c.handleProcessClose {
						return
					}
					if c.send && len(c.messages) > 0 {
						handlerCtx := cContext.New()
						c.Handler(handlerCtx, c.messages)
						c.messages = []*sarama.ConsumerMessage{}
						c.send = false
					}
				}
			}
		}(i)

		// 启动消息转发协程，把分组消费者接收的消息转发至聚合协程
		go func(c *consumer) {
			cLog.WithContext(ctx, map[string]interface{}{
				"source": "cKafka.consumer.consumeGroup",
				"topic":  c.topic,
			}).Trace("cKafka group consumer 启动消息转发协程")

			defer cLog.WithContext(ctx, map[string]interface{}{
				"source": "cKafka.consumer.consumeGroup",
				"topic":  c.topic,
			}).Trace("cKafka group consumer 消息转发协程已结束")

			for {
				select {
				case message := <-i.GroupHandler.(*DefaultGroupHandler).Messages:
					c.message <- message
				default:
					if c.forwardProcessClose {
						return
					}
				}
			}
		}(i)
		// 启动消费者
		err = i.groupConsumer.Consume(groupConsumerCtx, []string{i.topic}, i.GroupHandler)
	default:
		err = i.groupConsumer.Consume(groupConsumerCtx, []string{i.topic}, i.GroupHandler)
	}

	return err

}

var newConsumerLock = &sync.Mutex{}

func NewConsumer(option ConsumerOption) (*consumer, error) {
	var err error
	clientId, consumerId := option.GetID()

	newConsumerLock.Lock()
	defer newConsumerLock.Unlock()

	var client sarama.Client
	if kafkacleint := container.getClient(clientId); kafkacleint == nil {
		client, err = sarama.NewClient(option.Brokers, option.Config)
		if err != nil {
			return nil, err
		}

		// 生产者长链接注入容器
		container.saveClient(clientId, client)
	}

	var c *consumer
	if op := container.getOperator(clientId, consumerId); op != nil {
		c = op.(*consumer)
	} else {
		c = &consumer{
			id:                  consumerId,
			typ:                 option.Type,
			borkers:             option.Brokers,
			topic:               option.Topic,
			group:               option.Group,
			partitions:          option.Partitions,
			offset:              option.Offset,
			size:                option.Size,
			wait:                option.Wait,
			multi:               option.Multi,
			Handler:             func(ctx *gin.Context, msgs []*sarama.ConsumerMessage) {},
			GroupHandler:        &DefaultGroupHandler{Messages: make(chan *sarama.ConsumerMessage)},
			config:              option.Config,
			client:              client,
			clientId:            clientId,
			groupConsumer:       nil,
			partitionConsumer:   nil,
			partitionWait:       &sync.WaitGroup{},
			partitionProcess:    []sarama.PartitionConsumer{},
			message:             make(chan *sarama.ConsumerMessage),
			messages:            []*sarama.ConsumerMessage{},
			send:                false,
			ticker:              nil,
			deleted:             false,
			running:             false,
			multiLock:           &sync.Mutex{},
			tickerProcessClose:  false,
			handleProcessClose:  false,
			forwardProcessClose: false,
		}

		switch option.Type {
		case CONSUMER_TYPE_PARTITION:
			c.partitionConsumer, err = sarama.NewConsumerFromClient(client)
			if err != nil {
				return nil, err
			}
			if c.partitions == nil || len(c.partitions) == 0 {
				var partitions []int32
				partitions, err = c.partitionConsumer.Partitions(c.topic)
				if err != nil {
					return nil, err
				}
				c.partitions = partitions
			}
		case CONSUMER_TYPE_GROUP:
			c.groupConsumer, err = sarama.NewConsumerGroupFromClient(c.group, client)
			if err != nil {
				return nil, err
			}
		}

		container.saveOperator(clientId, consumerId, c)
	}

	return c, err
}

type DefaultGroupHandler struct {
	Messages chan *sarama.ConsumerMessage
}

func (i *DefaultGroupHandler) Setup(sarama.ConsumerGroupSession) error {
	return nil
}

func (i *DefaultGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

func (i *DefaultGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		log.Println(string(message.Value))
		i.Messages <- message
	}
	return nil
}

func GetConsumer(name string, typ ConsumerType) (instance *consumer, err error) {
	if kafka_config.Connections == nil {
		err = errors.New(fmt.Sprintf("cKafka 配置不存在[%s]", name))
		return
	}

	if conf, confok := kafka_config.Connections[name]; !confok || conf == nil {
		err = errors.New(fmt.Sprintf("cKafka 配置不存在[%s]", name))
		return
	}

	if kafka_config.Connections[name].Topic == "" {
		err = errors.New(fmt.Sprintf("cKafka 配置异常[%s.Topics]", name))
		return
	}

	if kafka_config.Connections[name].Brokers == nil || len(kafka_config.Connections[name].Brokers) == 0 {
		err = errors.New(fmt.Sprintf("cKafka 配置异常[%s.Brokers]", name))
		return
	}

	if kafka_config.Connections[name].Consumer == nil {
		err = errors.New(fmt.Sprintf("cKafka 配置不存在[%s.Consumer]", name))
		return
	}

	var tlsCert, tlsKey string
	var tlsEnable, tlsInsecureSkipVerify bool
	var KeepAlive, DialTimeout, ReadTimeout, WriteTimeout, Hearbeat int64
	var MaxOpenRequests int

	config := sarama.NewConfig()
	config.Version = sarama.MaxVersion
	config.Consumer.Return.Errors = true
	if kafka_config.Connections[name].Consumer.Hearbeat > 0 {
		Hearbeat = kafka_config.Connections[name].Consumer.Hearbeat
		config.Producer.RequiredAcks = sarama.WaitForAll
	}

	if kafka_config.Connections[name].Net != nil {
		if kafka_config.Connections[name].Net.Keepalive > 0 {
			KeepAlive = kafka_config.Connections[name].Net.Keepalive
			config.Net.KeepAlive = time.Duration(kafka_config.Connections[name].Net.Keepalive) * time.Second
		}
		if kafka_config.Connections[name].Net.MaxOpenRequests > 0 {
			MaxOpenRequests = kafka_config.Connections[name].Net.MaxOpenRequests
			config.Net.MaxOpenRequests = kafka_config.Connections[name].Net.MaxOpenRequests
		}
		if kafka_config.Connections[name].Net.DialTimeout > 0 {
			DialTimeout = kafka_config.Connections[name].Net.DialTimeout
			config.Net.DialTimeout = time.Duration(kafka_config.Connections[name].Net.DialTimeout) * time.Second
		}
		if kafka_config.Connections[name].Net.ReadTimeout > 0 {
			ReadTimeout = kafka_config.Connections[name].Net.ReadTimeout
			config.Net.ReadTimeout = time.Duration(kafka_config.Connections[name].Net.ReadTimeout) * time.Second
		}
		if kafka_config.Connections[name].Net.WriteTimeout > 0 {
			WriteTimeout = kafka_config.Connections[name].Net.WriteTimeout
			config.Net.WriteTimeout = time.Duration(kafka_config.Connections[name].Net.WriteTimeout) * time.Second
		}
		if kafka_config.Connections[name].Net.TLS != nil && kafka_config.Connections[name].Net.TLS.Enable == true {
			tlsCert = kafka_config.Connections[name].Net.TLS.Cert
			tlsKey = kafka_config.Connections[name].Net.TLS.Key
			tlsConfig, e := tls.NewConfig(kafka_config.Connections[name].Net.TLS.Cert, kafka_config.Connections[name].Net.TLS.Key)
			if e != nil {
				return nil, e
			}

			tlsEnable = kafka_config.Connections[name].Net.TLS.Enable
			tlsInsecureSkipVerify = kafka_config.Connections[name].Net.TLS.Skip

			config.Net.TLS.Enable = true
			config.Net.TLS.Config = tlsConfig
			config.Net.TLS.Config.InsecureSkipVerify = kafka_config.Connections[name].Net.TLS.Skip
		}
	}

	id := fmt.Sprintf("%s:%t:%t:%d:%d:%d:%d:%d:%d:%t:%t:%s:%s",
		typ,
		config.Producer.Return.Successes,
		config.Producer.Return.Errors,
		Hearbeat,
		KeepAlive,
		MaxOpenRequests,
		DialTimeout,
		ReadTimeout,
		WriteTimeout,
		tlsEnable,
		tlsInsecureSkipVerify,
		tlsCert,
		tlsKey)

	return NewConsumer(ConsumerOption{
		ID:         id,
		Type:       typ,
		Config:     config,
		Brokers:    kafka_config.Connections[name].Brokers,
		Topic:      kafka_config.Connections[name].Topic,
		Group:      kafka_config.Connections[name].Consumer.Group,
		Partitions: kafka_config.Connections[name].Consumer.Partitions,
		Offset:     kafka_config.Connections[name].Consumer.Offset,
		Size:       kafka_config.Connections[name].Consumer.Size,
		Wait:       kafka_config.Connections[name].Consumer.Wait,
		Multi:      kafka_config.Connections[name].Consumer.Multi,
	})
}

func GetPartitionConsumer(name string) (instance *consumer, err error) {
	return GetConsumer(name, CONSUMER_TYPE_PARTITION)
}

func GetGroupConsumer(name string) (instance *consumer, err error) {
	return GetConsumer(name, CONSUMER_TYPE_GROUP)
}

func DefaultConsumer() (instance *consumer, err error) {
	return GetConsumer("default", CONSUMER_TYPE_PARTITION)
}
